package com.nx.platform.es.biz.esspider.reactor;


import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.nx.platform.es.biz.esspider.process.ProcessorService;
import com.nx.platform.es.biz.esspider.source.SourceService;
import com.nx.platform.es.common.utils.ServiceListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;

/**
 * @author
 * @date 2018/01/26
 */
public abstract class Reactor extends AbstractExecutionThreadService {

    protected static final Logger LOGGER = LogManager.getLogger(Reactor.class);

    private final List<SourceService> sources;
    private final List<ProcessorService> processors;

    protected Reactor() {
        this.sources = new LinkedList<>();
        this.processors = new LinkedList<>();
        addListener(this);
    }

    protected abstract void init();

    protected void addSource(SourceService source) {
        sources.add(source);
    }

    protected void addProcessor(ProcessorService processService) {
        processors.add(processService);
    }

    @Override
    protected void startUp() throws Exception {
        init();
        processors.forEach(p -> addListener(p).startAsync().awaitRunning());
        sources.forEach(s -> addListener(s).startAsync());
    }

    @Override
    protected void run() throws Exception {
        sources.forEach(Service::awaitTerminated);
    }

    @Override
    protected void triggerShutdown() {
        sources.stream().filter(s -> s.state() != State.FAILED).forEach(s -> s.stopAsync().awaitTerminated());
        processors.stream().filter(s -> s.state() != State.FAILED).forEach(s -> s.stopAsync().awaitTerminated());
    }

    @Override
    protected void shutDown() throws Exception {
        triggerShutdown();
    }

    @Nullable
    @SuppressWarnings("unchecked")
    public <T extends SourceService> T getSource(Class<T> clazz) {
        for (SourceService source : sources) {
            if (source.getClass().equals(clazz)) {
                return (T) source;
            }
        }
        return null;
    }

    protected Service addListener(Service service) {
        Executor executor = MoreExecutors.directExecutor();
        service.addListener(new ServiceListener(LOGGER, service.toString()), executor);
        return service;
    }

}
